{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# TFX pipeline example - Chicago Taxi tips prediction\n",
    "\n",
    "## Overview\n",
    "[Tensorflow Extended (TFX)](https://github.com/tensorflow/tfx) is a Google-production-scale machine\n",
    "learning platform based on TensorFlow. It provides a configuration framework to express ML pipelines\n",
    "consisting of TFX components, which brings the user large-scale ML task orchestration, artifact lineage, as well as the power of various [TFX libraries](https://www.tensorflow.org/resources/libraries-extensions). Kubeflow Pipelines can be used as the orchestrator supporting the \n",
    "execution of a TFX pipeline.\n",
    "\n",
    "This sample demonstrates how to author a ML pipeline in TFX and run it on a KFP deployment. \n",
    "\n",
    "## Permission\n",
    "\n",
    "This pipeline requires Google Cloud Storage permission to run. \n",
    "If KFP was deployed through K8S marketplace, please make sure **\"Allow access to the following Cloud APIs\"** is checked when creating the cluster. <img src=\"check_permission.png\">\n",
    "Otherwise, follow instructions in [the guideline](https://github.com/kubeflow/pipelines/blob/master/manifests/gcp_marketplace/guide.md#gcp-service-account-credentials) to guarantee at least, that the service account has `storage.admin` role."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "!python3 -m pip install pip --upgrade --quiet --user\n",
    "!python3 -m pip install kfp --upgrade --quiet --user\n",
    "!python3 -m pip install tfx==0.22.0 --quiet --user"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "pycharm": {
     "name": "#%% md\n"
    }
   },
   "source": [
    "Note: if you're warned by \n",
    "```\n",
    "WARNING: The script {LIBRARY_NAME} is installed in '/home/jupyter/.local/bin' which is not on PATH.\n",
    "```\n",
    "You might need to fix by running the next cell and restart the kernel."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "pycharm": {
     "name": "#%%\n"
    }
   },
   "outputs": [],
   "source": [
    "# Set `PATH` to include user python binary directory and a directory containing `skaffold`.\n",
    "PATH=%env PATH\n",
    "%env PATH={PATH}:/home/jupyter/.local/bin"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "In this example we'll need TFX SDK later than 0.21 to leverage the [`RuntimeParameter`](https://github.com/tensorflow/tfx/blob/93ea0b4eda5a6000a07a1e93d93a26441094b6f5/tfx/orchestration/data_types.py#L137) feature.\n",
    "\n",
    "## RuntimeParameter in TFX DSL\n",
    "Currently, TFX DSL only supports parameterizing field in the `PARAMETERS` section of `ComponentSpec`, see [here](https://github.com/tensorflow/tfx/blob/93ea0b4eda5a6000a07a1e93d93a26441094b6f5/tfx/types/component_spec.py#L126). This prevents runtime-parameterizing the pipeline topology. Also, if the declared type of the field is a protobuf, the user needs to pass in a dictionary with exactly the same names for each field, and specify one or more value as `RuntimeParameter` objects. In other word, the dictionary should be able to be passed in to [`ParseDict()` method](https://github.com/protocolbuffers/protobuf/blob/04a11fc91668884d1793bff2a0f72ee6ce4f5edd/python/google/protobuf/json_format.py#L433) and produce the correct pb message."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import os\n",
    "from typing import Text\n",
    "\n",
    "import kfp\n",
    "\n",
    "import tensorflow_model_analysis as tfma\n",
    "\n",
    "from tfx.components import Evaluator\n",
    "from tfx.components import CsvExampleGen\n",
    "from tfx.components import ExampleValidator\n",
    "from tfx.components import Pusher\n",
    "from tfx.components import SchemaGen\n",
    "from tfx.components import StatisticsGen\n",
    "from tfx.components import Trainer\n",
    "from tfx.components import Transform\n",
    "from tfx.orchestration import data_types\n",
    "from tfx.orchestration import pipeline\n",
    "from tfx.orchestration.kubeflow import kubeflow_dag_runner\n",
    "from tfx.proto import pusher_pb2\n",
    "from tfx.utils.dsl_utils import external_input"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# In TFX MLMD schema, pipeline name is used as the unique id of each pipeline.\n",
    "# Assigning workflow ID as part of pipeline name allows the user to bypass\n",
    "# some schema checks which are redundant for experimental pipelines.\n",
    "pipeline_name = 'taxi_pipeline_with_parameters'\n",
    "\n",
    "# Path of pipeline data root, should be a GCS path.\n",
    "# Note that when running on KFP, the pipeline root is always a runtime parameter.\n",
    "# The value specified here will be its default.\n",
    "pipeline_root = os.path.join('gs://{{kfp-default-bucket}}', 'tfx_taxi_simple',\n",
    "                              kfp.dsl.RUN_ID_PLACEHOLDER)\n",
    "\n",
    "# Location of input data, should be a GCS path under which there is a csv file.\n",
    "data_root_param = data_types.RuntimeParameter(\n",
    "    name='data-root',\n",
    "    default='gs://ml-pipeline-playground/tfx_taxi_simple/data',\n",
    "    ptype=Text,\n",
    ")\n",
    "\n",
    "# Path to the module file, GCS path.\n",
    "# Module file is one of the recommended way to provide customized logic for component\n",
    "# includeing Trainer and Transformer.\n",
    "# See https://github.com/tensorflow/tfx/blob/93ea0b4eda5a6000a07a1e93d93a26441094b6f5/tfx/components/trainer/component.py#L38\n",
    "taxi_module_file_param = data_types.RuntimeParameter(\n",
    "    name='module-file',\n",
    "    default='gs://ml-pipeline-playground/tfx_taxi_simple/modules/taxi_utils.py',\n",
    "    ptype=Text,\n",
    ")\n",
    "\n",
    "# Number of epochs in training.\n",
    "train_steps = data_types.RuntimeParameter(\n",
    "    name='train-steps',\n",
    "    default=10,\n",
    "    ptype=int,\n",
    ")\n",
    "\n",
    "# Number of epochs in evaluation.\n",
    "eval_steps = data_types.RuntimeParameter(\n",
    "    name='eval-steps',\n",
    "    default=5,\n",
    "    ptype=int,\n",
    ")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## TFX Components\n",
    "\n",
    "Please refer to the [official guide](https://www.tensorflow.org/tfx/guide#tfx_pipeline_components) for the detailed explanation and purpose of each TFX component."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# The input data location is parameterized by _data_root_param\n",
    "examples = external_input(data_root_param)\n",
    "example_gen = CsvExampleGen(input=examples)\n",
    "\n",
    "statistics_gen = StatisticsGen(examples=example_gen.outputs['examples'])\n",
    "\n",
    "infer_schema = SchemaGen(\n",
    "    statistics=statistics_gen.outputs['statistics'], infer_feature_shape=False)\n",
    "\n",
    "validate_stats = ExampleValidator(\n",
    "  statistics=statistics_gen.outputs['statistics'],\n",
    "  schema=infer_schema.outputs['schema'])\n",
    "\n",
    "# The module file used in Transform and Trainer component is paramterized by\n",
    "# _taxi_module_file_param.\n",
    "transform = Transform(\n",
    "  examples=example_gen.outputs['examples'],\n",
    "  schema=infer_schema.outputs['schema'],\n",
    "  module_file=taxi_module_file_param)\n",
    "\n",
    "# The numbers of steps in train_args are specified as RuntimeParameter with\n",
    "# name 'train-steps' and 'eval-steps', respectively.\n",
    "trainer = Trainer(\n",
    "  module_file=taxi_module_file_param,\n",
    "  transformed_examples=transform.outputs['transformed_examples'],\n",
    "  schema=infer_schema.outputs['schema'],\n",
    "  transform_graph=transform.outputs['transform_graph'],\n",
    "  train_args={'num_steps': train_steps},\n",
    "  eval_args={'num_steps': eval_steps})\n",
    "\n",
    "# Set the TFMA config for Model Evaluation and Validation.\n",
    "eval_config = tfma.EvalConfig(\n",
    "    model_specs=[\n",
    "      # Using signature 'eval' implies the use of an EvalSavedModel. To use\n",
    "      # a serving model remove the signature to defaults to 'serving_default'\n",
    "      # and add a label_key.\n",
    "      tfma.ModelSpec(signature_name='eval')\n",
    "    ],\n",
    "    metrics_specs=[\n",
    "      tfma.MetricsSpec(\n",
    "          # The metrics added here are in addition to those saved with the\n",
    "          # model (assuming either a keras model or EvalSavedModel is used).\n",
    "          # Any metrics added into the saved model (for example using\n",
    "          # model.compile(..., metrics=[...]), etc) will be computed\n",
    "          # automatically.\n",
    "          metrics=[\n",
    "              tfma.MetricConfig(class_name='ExampleCount')\n",
    "          ],\n",
    "          # To add validation thresholds for metrics saved with the model,\n",
    "          # add them keyed by metric name to the thresholds map.\n",
    "          thresholds = {\n",
    "              'binary_accuracy': tfma.MetricThreshold(\n",
    "                  value_threshold=tfma.GenericValueThreshold(\n",
    "                      lower_bound={'value': 0.5}),\n",
    "                  change_threshold=tfma.GenericChangeThreshold(\n",
    "                     direction=tfma.MetricDirection.HIGHER_IS_BETTER,\n",
    "                     absolute={'value': -1e-10}))\n",
    "          }\n",
    "      )\n",
    "    ],\n",
    "    slicing_specs=[\n",
    "      # An empty slice spec means the overall slice, i.e. the whole dataset.\n",
    "      tfma.SlicingSpec(),\n",
    "      # Data can be sliced along a feature column. In this case, data is\n",
    "      # sliced along feature column trip_start_hour.\n",
    "      tfma.SlicingSpec(feature_keys=['trip_start_hour'])\n",
    "    ])\n",
    "\n",
    "# The name of slicing column is specified as a RuntimeParameter.\n",
    "evaluator = Evaluator(\n",
    "  examples=example_gen.outputs['examples'],\n",
    "  model=trainer.outputs['model'],\n",
    "  eval_config=eval_config)\n",
    "\n",
    "pusher = Pusher(\n",
    "  model=trainer.outputs['model'],\n",
    "  model_blessing=evaluator.outputs['blessing'],\n",
    "  push_destination=pusher_pb2.PushDestination(\n",
    "      filesystem=pusher_pb2.PushDestination.Filesystem(\n",
    "          base_directory=os.path.join(\n",
    "              str(pipeline.ROOT_PARAMETER), 'model_serving'))))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Create the DSL pipeline object.\n",
    "# This pipeline obj carries the business logic of the pipeline, but no runner-specific information\n",
    "# was included.\n",
    "dsl_pipeline = pipeline.Pipeline(\n",
    "  pipeline_name=pipeline_name,\n",
    "  pipeline_root=pipeline_root,\n",
    "  components=[\n",
    "      example_gen, statistics_gen, infer_schema, validate_stats, transform,\n",
    "      trainer, model_analyzer, model_validator, pusher\n",
    "  ],\n",
    "  enable_cache=True,\n",
    "  beam_pipeline_args=['--direct_num_workers=%d' % 0],\n",
    ")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Specify a TFX docker image. For the full list of tags please see:\n",
    "# https://hub.docker.com/r/tensorflow/tfx/tags\n",
    "tfx_image = 'gcr.io/tfx-oss-public/tfx:0.22.0'\n",
    "config = kubeflow_dag_runner.KubeflowDagRunnerConfig(\n",
    "      kubeflow_metadata_config=kubeflow_dag_runner\n",
    "      .get_default_kubeflow_metadata_config(),\n",
    "      tfx_image=tfx_image)\n",
    "kfp_runner = kubeflow_dag_runner.KubeflowDagRunner(config=config)\n",
    "# KubeflowDagRunner compiles the DSL pipeline object into KFP pipeline package.\n",
    "# By default it is named <pipeline_name>.tar.gz\n",
    "kfp_runner.run(dsl_pipeline)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "run_result = kfp.Client(\n",
    "    host='1234567abcde-dot-us-central2.pipelines.googleusercontent.com'  # Put your KFP endpoint here\n",
    ").create_run_from_pipeline_package(\n",
    "    pipeline_name + '.tar.gz', \n",
    "    arguments={\n",
    "        # Uncomment following lines in order to use custom GCS bucket/module file/training data.\n",
    "        # 'pipeline-root': 'gs://<your-gcs-bucket>/tfx_taxi_simple/' + kfp.dsl.RUN_ID_PLACEHOLDER,\n",
    "        # 'module-file': '<gcs path to the module file>',  # delete this line to use default module file.\n",
    "        # 'data-root': '<gcs path to the data>'  # delete this line to use default data.\n",
    "})"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 2",
   "language": "python",
   "name": "python2"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 2
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython2",
   "version": "2.7.16"
  },
  "pycharm": {
   "stem_cell": {
    "cell_type": "raw",
    "metadata": {
     "collapsed": false
    },
    "source": []
   }
  }
 },
 "nbformat": 4,
 "nbformat_minor": 4
}
